1 package org.apache.lucene.index; 2 3 /* 4 * Licensed to the Apache Software Foundation (ASF) under one or more 5 * contributor license agreements. See the NOTICE file distributed with 6 * this work for additional information regarding copyright ownership. 7 * The ASF licenses this file to You under the Apache License, Version 2.0 8 * (the "License"); you may not use this file except in compliance with 9 * the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 import java.util.IdentityHashMap; 20 import java.util.Map; 21 22 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; 23 import org.apache.lucene.util.InfoStream; 24 import org.apache.lucene.util.ThreadInterruptedException; 25 26 /** 27 * Controls the health status of a {@link DocumentsWriter} sessions. This class 28 * used to block incoming indexing threads if flushing significantly slower than 29 * indexing to ensure the {@link DocumentsWriter}s healthiness. If flushing is 30 * significantly slower than indexing the net memory used within an 31 * {@link IndexWriter} session can increase very quickly and easily exceed the 32 * JVM's available memory. 33 * <p> 34 * To prevent OOM Errors and ensure IndexWriter's stability this class blocks 35 * incoming threads from indexing once 2 x number of available 36 * {@link ThreadState}s in {@link DocumentsWriterPerThreadPool} is exceeded. 37 * Once flushing catches up and the number of flushing DWPT is equal or lower 38 * than the number of active {@link ThreadState}s threads are released and can 39 * continue indexing. 40 */ 41 final class DocumentsWriterStallControl { 42 43 private volatile boolean stalled; 44 private int numWaiting; // only with assert 45 private boolean wasStalled; // only with assert 46 private final Map<Thread, Boolean> waiting = new IdentityHashMap<>(); // only with assert 47 private final InfoStream infoStream; 48 49 DocumentsWriterStallControl(LiveIndexWriterConfig iwc) { 50 infoStream = iwc.getInfoStream(); 51 } 52 53 /** 54 * Update the stalled flag status. This method will set the stalled flag to 55 * <code>true</code> iff the number of flushing 56 * {@link DocumentsWriterPerThread} is greater than the number of active 57 * {@link DocumentsWriterPerThread}. Otherwise it will reset the 58 * {@link DocumentsWriterStallControl} to healthy and release all threads 59 * waiting on {@link #waitIfStalled()} 60 */ 61 synchronized void updateStalled(boolean stalled) { 62 this.stalled = stalled; 63 if (stalled) { 64 wasStalled = true; 65 } 66 notifyAll(); 67 } 68 69 /** 70 * Blocks if documents writing is currently in a stalled state. 71 * 72 */ 73 void waitIfStalled() { 74 if (stalled) { 75 synchronized (this) { 76 if (stalled) { // react on the first wakeup call! 77 // don't loop here, higher level logic will re-stall! 78 try { 79 incWaiters(); 80 // Defensive, in case we have a concurrency bug that fails to .notify/All our thread: 81 // just wait for up to 1 second here, and let caller re-stall if it's still needed: 82 wait(1000); 83 decrWaiters(); 84 } catch (InterruptedException e) { 85 throw new ThreadInterruptedException(e); 86 } 87 } 88 } 89 } 90 } 91 92 boolean anyStalledThreads() { 93 return stalled; 94 } 95 96 long stallStartNS; 97 98 private void incWaiters() { 99 stallStartNS = System.nanoTime(); 100 if (infoStream.isEnabled("DW") && numWaiting == 0) { 101 infoStream.message("DW", "now stalling flushes"); 102 } 103 numWaiting++; 104 assert waiting.put(Thread.currentThread(), Boolean.TRUE) == null; 105 assert numWaiting > 0; 106 } 107 108 private void decrWaiters() { 109 numWaiting--; 110 assert waiting.remove(Thread.currentThread()) != null; 111 assert numWaiting >= 0; 112 if (infoStream.isEnabled("DW") && numWaiting == 0) { 113 long stallEndNS = System.nanoTime(); 114 infoStream.message("DW", "done stalling flushes for " + ((stallEndNS - stallStartNS)/1000000.0) + " ms"); 115 } 116 } 117 118 synchronized boolean hasBlocked() { // for tests 119 return numWaiting > 0; 120 } 121 122 boolean isHealthy() { // for tests 123 return !stalled; // volatile read! 124 } 125 126 synchronized boolean isThreadQueued(Thread t) { // for tests 127 return waiting.containsKey(t); 128 } 129 130 synchronized boolean wasStalled() { // for tests 131 return wasStalled; 132 } 133 }